使用 ETCD 作为注册中心
Etcd 是什么?
Etcd 是一个使用一致性哈希算法 (Raft) 在分布式环境下的 key/value 存储服务(就是 Go 生态下的 Zookeeper)。利用 Etcd 的特性,应用程序可以在集群中共享信息、配置或作服务发现,Etcd 会在集群的各个节点中复制这些数据并保证这些数据始终正确。所以除了 Consul 之外,在 Go 生态中,还可以选择基于 Etcd 作为注册中心
配置环境
使用 Docker Compose 模拟启动一个 3 节点的 etcd 集群。
编辑 docker-compose.yml
文件,参考该 配置
使用 docker-compose up 启动集群之后使用 docker exec 命令登录到任一节点测试 etcd 集群。
# 检查集群是否配置好(下面的命令也可以通过 docker 这样外部执行)
$ docker exec 7efaa50ed64f /bin/sh -c 'etcdctl member list'
daf3fd52e3583ff, started, node3, http://172.16.238.102:2380, http://172.16.238.102:2379, false
422a74f03b622fef, started, node1, http://172.16.238.100:2380, http://172.16.238.100:2379, false
ed635d2a2dbef43d, started, node2, http://172.16.238.101:2380, http://172.16.238.101:2379, false
如果要进行键值的存储和读取的话,对应的交互指令如下
# 推送 key-value
$ etcdctl put hello world
OK
# 取得数据
$ etcdctl get hello
hello
world
# 可以只显示 value
$ etcdctl get hello --print-value-only
world
# 删除 key
$ etcdctl del hello
还可以通过新开一个终端窗口通过 watch 指令监听上述键值变更:
$ etcdctl watch hello
基于 ETCD 的服务发现原理
当我们的服务有一定规模之后,因为一个服务可能会被很多个服务依赖,我们就需要能够动态增减节点,而无需修改很多的调用方配置并重启。
整体的流程如下:
在 Go 中使用
这里使用官方给的 客户端
go get go.etcd.io/etcd/client/v3
put 命令用来设置键值对数据,get 命令用来根据 key 获取值。
import (
"context"
"testing"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
func TestConnet(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"172.16.238.101:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
t.Errorf("connect to etcd failed, err:%v\n", err)
return
}
t.Log("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "q1mi", "dsb")
cancel()
if err != nil {
t.Errorf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "q1mi")
cancel()
if err != nil {
t.Errorf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
t.Logf("%s:%s\n", ev.Key, ev.Value)
}
}
自己编写服务发现
服务发现的第一个工作就是从 ETCD 里面读取相应的服务配置
创建客户端
创建一个客户端
func NewEtcd(urls string) (*clientv3.Client, error) {
cfg := clientv3.Config{
Endpoints: strings.Split(urls, ","),
DialTimeout: 10 * time.Second,
}
username, password := config.GetEtcdUsername(), config.GetEtcdPassword()
if username != "" && password != "" {
cfg.Username = username
cfg.Password = password
}
cli, err := clientv3.New(cfg)
if err != nil {
return nil, fmt.Errorf("client.New err: %v", err)
}
return cli, nil
}
服务配置的读写
原理就是把服务配置通过 json 序列化的方式读写
const (
ROOT = "/"
SERVICE = "service"
DEFAULT_CLUSTER = "default"
)
type Config struct {
ServiceVersion string `json:"service_version"`
ServicePort string `json:"service_port"`
HttpPort string `json:"http_port"`
IsSsl bool `json:"is_ssl"`
}
type ServiceConfig struct {
EtcdServerUrl string
ServerName string
Config
}
func NewServiceConfig(etcdServerUrl, serverName string) *ServiceConfig {
return &ServiceConfig{
EtcdServerUrl: etcdServerUrl,
ServerName: serverName,
}
}
func (c *ServiceConfig) GetKeyName(serverName string) string {
return ROOT + SERVICE + "." + serverName + "." + DEFAULT_CLUSTER
}
读取配置
func (c *ServiceConfig) GetConfig() (*Config, error) {
cli, err := util.NewEtcd(c.EtcdServerUrl)
if err != nil {
return nil, fmt.Errorf("generating etcd client failed to %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
key := c.GetKeyName(c.ServerName)
serviceInfo, err := cli.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("etcd client Get key %s failed to %v", key, err)
}
var config Config
if len(serviceInfo.Kvs) > 0 {
err := json.Unmarshal(serviceInfo.Kvs[0].Value, &config)
if err != nil {
return nil, fmt.Errorf("json.Unmarshal err: %v", err)
}
}
if config.ServicePort == "" {
return nil, fmt.Errorf("servicePort is empty, key: %s", key)
}
return &config, nil
}
写入配置
func (c *ServiceConfig) WriteConfig(cf Config) error {
cli, err := util.NewEtcd(c.EtcdServerUrl)
if err != nil {
return fmt.Errorf("generating etcd client failed to %v", err)
}
key := c.GetKeyName(c.ServerName)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cfJson, err := marshalToString(&cf)
if err != nil {
return fmt.Errorf("json.MarshalToString err: %v", err)
}
_, err = cli.Put(ctx, key, cfJson)
if err != nil {
return fmt.Errorf("cli.Put err: %v", err)
}
return nil
}
func marshalToString(v interface{}) (string, error) {
b, err := json.Marshal(v)
return string(b), err
}
测试使用
package etcdconfig
import (
"context"
"reflect"
"testing"
"alsritter.icu/rabbit/internal/util"
)
func TestServiceConfig_GetConfig(t *testing.T) {
type fields struct {
EtcdServerUrl string
ServerName string
}
tests := []struct {
name string
fields fields
want *Config
wantErr bool
}{
{
"test read service config info from etcd",
fields{
EtcdServerUrl: "172.16.238.101:2379",
ServerName: "rabbit-test-read-server",
},
&Config{
ServiceVersion: "1.0",
ServicePort: "6060",
HttpPort: "7070",
IsSsl: true,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &ServiceConfig{
EtcdServerUrl: tt.fields.EtcdServerUrl,
ServerName: tt.fields.ServerName,
}
cli, _ := util.NewEtcd(c.EtcdServerUrl)
key := c.GetKeyName(c.ServerName)
cli.Put(context.Background(), key, `{"service_version":"1.0","service_port":"6060","http_port":"7070","is_ssl":true}`)
got, err := c.GetConfig()
if (err != nil) != tt.wantErr {
t.Errorf("ServiceConfig.GetConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ServiceConfig.GetConfig() = %v, want %v", got, tt.want)
}
})
}
}
func TestServiceConfig_WriteConfig(t *testing.T) {
type fields struct {
EtcdServerUrl string
ServerName string
}
tests := []struct {
name string
fields fields
config Config
wantErr bool
}{
{
"test write service config info to etcd",
fields{
EtcdServerUrl: "172.16.238.101:2379",
ServerName: "rabbit-test-write-server",
},
Config{
ServiceVersion: "1.0",
ServicePort: "8080",
HttpPort: "9090",
IsSsl: true,
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &ServiceConfig{
EtcdServerUrl: tt.fields.EtcdServerUrl,
ServerName: tt.fields.ServerName,
}
cli, _ := util.NewEtcd(c.EtcdServerUrl)
key := c.GetKeyName(c.ServerName)
cli.Delete(context.Background(), key)
if err := c.WriteConfig(tt.config); (err != nil) != tt.wantErr {
t.Errorf("ServiceConfig.WriteConfig() error = %v, wantErr %v", err, tt.wantErr)
}
serviceInfo, _ := cli.Get(context.Background(), key)
if string(serviceInfo.Kvs[0].Value) != `{"service_version":"1.0","service_port":"8080","http_port":"9090","is_ssl":true}` {
t.Errorf("write failed")
}
})
}
}